package org.apache.seatunnel.connectors.seatunnel.gaussdbmongodb.sink;

import org.apache.seatunnel.connectors.seatunnel.gaussdbmongodb.serde.SerializableFunction;
import org.bson.BsonDocument;

import java.util.Arrays;
import java.util.stream.Collectors;

public class MongoKeyExtractor implements SerializableFunction<BsonDocument, BsonDocument> {

    private static final long serialVersionUID = 1L;

    private final String[] upsertKey;

    public MongoKeyExtractor(MongodbWriterOptions options) {
        upsertKey = options.getUpsertKey();
    }

    @Override
    public BsonDocument apply(BsonDocument bsonDocument) {
        return Arrays.stream(upsertKey)
                .filter(bsonDocument::containsKey)
                .collect(
                        Collectors.toMap(
                                key -> key, bsonDocument::get, (v1, v2) -> v1, BsonDocument::new));
    }
}
